<?php

/**
 * Copyright (c) 2020
 * 摘    要：
 * 作    者：san
 * 修改日期：2020.03.30
 */

namespace App\Controller\WebSocket;

use App\Library\Clog\Log;
use App\Service\DeployService;
use App\Service\UserService;
use ErrorException;
use Hyperf\Contract\OnMessageInterface;
use Hyperf\Contract\OnOpenInterface;
use Hyperf\Contract\OnCloseInterface;
use Hyperf\Di\Annotation\Inject;
use Hyperf\Utils\Exception\ParallelExecutionException;
use Hyperf\Utils\Parallel;
use Swoole\Http\Request;
use Swoole\Websocket\Frame;
use Swoole\WebSocket\Server as WebSocketServer;
use Swoole\Server;

class IndexController implements OnMessageInterface, OnOpenInterface, OnCloseInterface
{
    const TASK_DEPLOY_INFO = 'task_deploy_info:';
    const TASK_DEPLOY      = 'task_id:';

    const INFO_TTL   = 60 * 60 * 24;
    const DEPLOY_TTL = 5 * 60;

    /**
     * @Inject()
     * @var  UserService
     */
    protected $userService;

    /**
     * @Inject()
     * @var  DeployService
     */
    protected $deployService;

    protected $redis;

    public $uid;

    public $taskId;

    /**
     * IndexController constructor.
     */
    public function __construct()
    {
        $this->redis = redis();
    }

    /**
     * 客户端连接
     *
     * @param WebSocketServer $server
     * @param Request $request
     */
    public function onOpen(WebSocketServer $server, Request $request): void
    {
        // 1、权限校验
        $this->uid = $this->_checkPermission($server, $request);
        // 2、上一次执行记录
        $this->taskId = $request->get['task_id'] ?? 0;
        if ($this->taskId) {
            try {
                $record = $this->deployService->getTaskDeployRecord($this->taskId, $this->uid);
                $record && $this->_handlePushStepInfo($server, $request, $record);
            } catch (\Exception $exception) {
                $server->push($request->fd, json_encode(['cmd' => 'info', 'data' => $exception->getMessage()]));
            }
        }
    }

    /**
     * 接收消息
     *
     * @param WebSocketServer $server
     * @param Frame $frame
     */
    public function onMessage(WebSocketServer $server, Frame $frame): void
    {
        try {
            $taskId   = 0;
            $taskData = json_decode($frame->data, true);
            $cmd      = $taskData['cmd'];
            if ($cmd == 'HeartBeat') {
                $server->push($frame->fd, json_encode(['cmd' => 'HeartBeatRecv']));
            } else {
                $taskId = $taskData['task_id'];
                $hasOne = $this->redis->incr(self::TASK_DEPLOY . $taskId);
                if ($hasOne > 1) {
                    $server->push($frame->fd, json_encode(['cmd' => 'info', 'data' => '当前任务已在部署，请勿重复提交']));
                } else {
                    $this->_handleDeploy($server, $frame->fd, $taskId);
                }
            }
        } catch (\Exception $exception) {
            $taskId && $this->redis->del(self::TASK_DEPLOY . $taskId);
            $server->push($frame->fd, json_encode(['cmd' => 'info', 'data' => 'system error']));
        }
    }

    /**
     * 客户端断开
     *
     * @param Server $server
     * @param int $fd
     * @param int $reactorId
     */
    public function onClose(Server $server, int $fd, int $reactorId): void
    {
        var_dump(json_encode(['message' => $fd . 'Close']));
    }


    /**
     * 处理上次部署错误信息
     *
     * @param $server
     * @param $request
     * @param $record
     */
    private function _handlePushStepInfo($server, $request, $record)
    {
        foreach ($record as $value) {
            $cmd  = str_replace(['\'', PHP_EOL], '', $value['command']);
            $memo = str_replace(['\'', PHP_EOL], '', $value['memo']);
            $push = [
                'user'    => getenv('USER'),
                'ip'      => '127.0.0.1',
                'command' => $cmd,
                'status'  => $value['status'],
                'percent' => $value['action'],
                'success' => $value['status'] ? $memo : '',
                'error'   => !$value['status'] ? $memo : '',
            ];
            $server->push($request->fd, json_encode(['cmd' => 'deploy', 'data' => $push]));
        }
    }

    /**
     * 检测是否登录状态
     *
     * @param WebSocketServer $server
     * @param Request $request
     * @return bool|mixed
     */
    private function _checkPermission(WebSocketServer $server, Request $request)
    {
        try {
            if (!isset($request->get['access_token'])) {
                throw new ErrorException("token丢失");
            }
            $token = $request->get['access_token'];
            return $this->userService->tokenCheck($token);
        } catch (\Exception $exception) {
            $server->push($request->fd, json_encode(['cmd' => 'info', 'data' => $exception->getMessage()]));
            Log::exception($exception);
            $server->close($request->fd);
        }
    }

    /**
     * 发布处理
     *
     * @param WebSocketServer $server
     * @param $fd
     * @param $taskId
     * @return void
     */
    private function _handleDeploy(WebSocketServer $server, $fd, $taskId)
    {
        try {
            $parallel = new Parallel();
            // 1、检测宿主机检出目录是否可读写
            $parallel->add(function () use ($server, $fd) {
                try {
                    $this->deployService->deploy($this->taskId, $this->uid);
                } catch (\Exception $exception) {
                    $server->push($fd, json_encode(['cmd' => 'info', 'data' => $exception->getMessage()]));
                }
            });
            // 2、定时器 查询执行记录
            $parallel->add(function () use ($server, $fd, $taskId) {
                swoole_timer_tick(1000, function ($timer_id) use ($server, $fd, $taskId) {
                    $lastId = $this->redis->get('push_key' . $taskId) ?? 0;
                    $data   = $this->deployService->getProcess($taskId, $lastId);
                    if (count($data)) {
                        $cmd  = str_replace(['\'', PHP_EOL], '', $data['command']);
                        $memo = str_replace(['\'', PHP_EOL], '', $data['memo']);
                        $push = [
                            'user'    => getenv('USER'),
                            'ip'      => '127.0.0.1',
                            'command' => $cmd,
                            'status'  => $data['status'],
                            'percent' => $data['action'],
                            'success' => $data['status'] ? $memo : '',
                            'error'   => !$data['status'] ? $memo : '',
                        ];

                        if ($lastId != $data['id']) {
                            $server->push($fd, json_encode(['cmd' => 'deploy', 'data' => $push]));
                            $this->redis->set('push_key' . $taskId, $data['id']);
                            $this->redis->hSet(self::TASK_DEPLOY_INFO . $taskId, (string)$data['id'], json_encode($push));
                        }
                        //清除定时器
                        if ($data['status'] == 0 || $data['action'] == 100) {
                            $this->redis->del('push_key' . $taskId);
                            swoole_timer_clear($timer_id);
                        }
                    }
                    $this->redis->del(self::TASK_DEPLOY . $taskId);
                });
                $this->redis->expire(self::TASK_DEPLOY_INFO . $taskId, self::INFO_TTL);
            });
            try {
                $result = $parallel->wait();
                Log::info('websocket', 'handleDeploy', $result);
            } catch (ParallelExecutionException $e) {
                Log::info('websocket', 'handleDeploy-error', ['result' => $e->getResults(), 'error' => $e->getThrowables()]);
            }
        } catch (\Exception $exception) {
            $server->push($fd, json_encode(['cmd' => 'info', 'data' => $exception->getMessage()]));
        }
    }
}
